阶段 A:数据基础设施
本阶段为量化中台的地基,目标是建立统一的时序数据存储、标准化采集框架、查询API和管理界面。
总览
| 子任务 |
内容 |
优先级 |
| A1 |
TimescaleDB 时序数据库 |
P0 - 第一步 |
| A2 |
Connector 数据采集框架 |
P0 - 第二步 |
| A5 |
数据查询 API |
P0 - 第三步 |
| A6 |
Dashboard 数据管理页 |
P1 - 第四步 |
| A3 |
数据调度器 |
P1 - 第五步 |
| A4 |
数据质量监控 |
P1 - 第六步 |
A1. TimescaleDB 时序数据库
目标
安装 PostgreSQL + TimescaleDB 扩展,替代 zip 文件存储,建立 4 层数据模型。
数据层级
| 层级 |
名称 |
说明 |
表名 |
| L1 |
行情 |
OHLCV + VWAP |
market_ohlcv |
| L2 |
Flow |
期权流、大单、暗池 |
flow_data |
| L3 |
Info |
新闻、公告、情绪 |
info_data |
| L4 |
另类 |
社媒、链上、另类信号 |
alt_data |
核心表结构
market_ohlcv (L1 行情)
CREATE TABLE market_ohlcv (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
source TEXT NOT NULL,
timeframe TEXT NOT NULL, -- '1d', '1h', '5m', '1m'
open FLOAT8,
high FLOAT8,
low FLOAT8,
close FLOAT8,
volume BIGINT,
vwap FLOAT8
);
SELECT create_hypertable('market_ohlcv', 'time');
CREATE INDEX idx_ohlcv_symbol_time ON market_ohlcv (symbol, time DESC);
flow_data (L2 资金流)
CREATE TABLE flow_data (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
source TEXT NOT NULL,
flow_type TEXT, -- 'option', 'darkpool', 'block'
side TEXT, -- 'call', 'put', 'buy', 'sell'
premium FLOAT8,
volume BIGINT,
strike FLOAT8,
expiry DATE,
raw_data JSONB
);
SELECT create_hypertable('flow_data', 'time');
info_data (L3 信息)
CREATE TABLE info_data (
time TIMESTAMPTZ NOT NULL,
source TEXT NOT NULL,
category TEXT, -- 'news', 'earnings', 'sec_filing'
symbol TEXT,
headline TEXT,
sentiment FLOAT8, -- -1.0 ~ +1.0
confidence FLOAT8, -- 0.0 ~ 1.0
raw_data JSONB
);
SELECT create_hypertable('info_data', 'time');
alt_data (L4 另类数据)
CREATE TABLE alt_data (
time TIMESTAMPTZ NOT NULL,
source TEXT NOT NULL,
category TEXT, -- 'social', 'onchain', 'satellite'
symbol TEXT,
signal_value FLOAT8,
metadata JSONB
);
SELECT create_hypertable('alt_data', 'time');
data_sources (数据源注册表)
CREATE TABLE data_sources (
id SERIAL PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
layer TEXT NOT NULL, -- 'L1', 'L2', 'L3', 'L4'
api_type TEXT, -- 'rest', 'websocket', 'file'
config JSONB DEFAULT '{}',
enabled BOOLEAN DEFAULT true,
last_sync TIMESTAMPTZ,
status TEXT DEFAULT 'idle' -- 'idle', 'syncing', 'error', 'ok'
);
文件清单
| 文件 |
说明 |
src/data/__init__.py |
包初始化 |
src/data/database.py |
SQLAlchemy async 连接池 + 表定义 |
src/data/models.py |
Pydantic 模型(请求/响应) |
scripts/init_db.py |
初始化 TimescaleDB + 建表 |
scripts/migrate_yfinance.py |
迁移现有 9 个 symbol 日线到数据库 |
迁移数据
现有 9 个 symbol 的日线数据(1998-2026)将从 yfinance CSV/zip 迁移到 market_ohlcv 表:
- SPY, QQQ, AAPL, MSFT, AMZN, GOOGL, TSLA, NVDA, META
A2. Connector 数据采集框架
目标
统一的数据源接口,每个源一个 Connector,支持拉取和实时流。
BaseConnector 接口
from abc import ABC, abstractmethod
from typing import AsyncIterator
class BaseConnector(ABC):
source_name: str
layer: str # 'L1', 'L2', 'L3', 'L4'
@abstractmethod
async def fetch(self, symbols: list[str], start: str, end: str) -> list[dict]:
"""批量拉取历史数据"""
...
async def stream(self, symbols: list[str]) -> AsyncIterator[dict]:
"""实时数据流(可选实现)"""
raise NotImplementedError
@abstractmethod
def transform(self, raw: dict) -> list[dict]:
"""原始数据 → 标准化格式"""
...
async def validate(self, data: list[dict]) -> list[dict]:
"""数据校验,过滤无效记录"""
return [d for d in data if self._is_valid(d)]
Connector 列表
| Connector |
层级 |
状态 |
文件 |
| yfinance |
L1 |
✅ 本次实现 |
yfinance_conn.py |
| Unusual Whales |
L2 |
⏳ 待接入 |
unusual_whales.py |
| FlowAlgo |
L2 |
⏳ 待接入 |
flowalgo.py |
| Polymarket |
L4 |
⏳ 待接入 |
polymarket.py |
文件清单
| 文件 |
说明 |
src/data/connectors/__init__.py |
包初始化 + 注册表 |
src/data/connectors/base.py |
BaseConnector 抽象类 |
src/data/connectors/yfinance_conn.py |
yfinance 数据源实现 |
A3. 数据调度器
目标
基于 APScheduler 的定时采集调度,按数据层级设置不同频率。
调度策略
| 层级 |
频率 |
触发时间 |
| L1 日线 |
每日 |
收盘后 16:30 ET |
| L1 分钟线 |
每5分钟 |
交易时段 9:30-16:00 ET |
| L2 Flow |
每分钟 |
交易时段 |
| L3 Info |
每15分钟 |
全天 |
| L4 另类 |
按源特性 |
自定义 |
文件
src/data/scheduler.py — DataScheduler 类
A4. 数据质量监控
检查维度
| 维度 |
说明 |
阈值 |
| 完整性 |
缺失交易日检测 |
缺失率 < 1% |
| 异常值 |
价格跳变、量能突变 |
3σ 标准差 |
| 延迟 |
数据到达延迟 |
< 5分钟 (L1日线) |
| 一致性 |
跨源数据对比 |
偏差 < 0.1% |
文件
src/data/quality.py — DataQualityChecker 类
A5. 数据查询 API
端点列表
| 方法 |
路径 |
说明 |
| GET |
/api/v1/data/ohlcv |
查询行情数据 |
| GET |
/api/v1/data/flow |
查询 Flow 数据 |
| GET |
/api/v1/data/sources |
数据源列表及状态 |
| POST |
/api/v1/data/sync/{source} |
手动触发同步 |
| GET |
/api/v1/data/quality |
数据质量报告 |
参数示例
GET /api/v1/data/ohlcv?symbol=SPY&start=2025-01-01&end=2025-12-31&timeframe=1d
Response:
{
"symbol": "SPY",
"timeframe": "1d",
"count": 252,
"data": [
{"time": "2025-01-02T00:00:00Z", "open": 470.5, "high": 472.3, ...},
...
]
}
文件
A6. Dashboard 数据管理页
页面路径
/data — 数据管理页
功能模块
- 数据源状态卡片 — 名称、层级、最后同步时间、记录数、状态灯(绿/黄/红)
- 数据覆盖热力图 — symbol × 日期矩阵,颜色深浅表示数据完整度
- 手动同步按钮 — 触发指定数据源立即同步
- 数据质量指标面板 — 完整性、异常值、延迟等指标可视化
文件
| 文件 |
说明 |
dashboard/src/pages/DataManager.jsx |
数据管理页组件 |
dashboard/src/api/client.js |
增加 data API 函数 |
dashboard/src/App.jsx |
增加 /data 路由 |
dashboard/src/components/Sidebar.jsx |
增加 Data 导航项 |
实施顺序
A1 TimescaleDB 建库建表
└──→ A2 Connector 框架
└──→ A5 数据查询 API
└──→ A6 Dashboard 页面
└──→ A3 数据调度器
└──→ A4 数据质量监控
新增依赖
sqlalchemy[asyncio]>=2.0
asyncpg>=0.29
psycopg2-binary>=2.9
apscheduler>=3.10
验证清单
最后更新: 2026-03-01